package com.anji.plus.gaea.job.executor.thread;


import com.anji.plus.gaea.job.core.dto.ReturnT;
import com.anji.plus.gaea.job.core.param.HandleCallbackParam;
import com.anji.plus.gaea.job.core.param.TriggerParam;
import com.anji.plus.gaea.job.executor.config.ExecutorConfiguration;
import com.anji.plus.gaea.job.executor.handler.JobHandler;
import com.anji.plus.gaea.job.executor.handler.param.JobInfo;
import com.anji.plus.gaea.job.executor.service.XxlJobExecutor;
import com.anji.plus.gaea.job.executor.util.XxlJobContext;
import com.anji.plus.gaea.job.executor.util.XxlJobFileAppender;
import com.anji.plus.gaea.job.executor.util.XxlJobHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.*;

/**
 * handler thread
 * @author xuxueli 2016-1-16 19:52:47
 *
 * Borrowed from xxljob v2.4.0
 */
public class JobThread extends Thread{
    private static Logger logger = LoggerFactory.getLogger(JobThread.class);

    private long jobId;
    private JobHandler handler;
    private LinkedBlockingQueue<TriggerParam> triggerQueue;
    private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID

    private volatile boolean toStop = false;
    private String stopReason;

    private boolean running = false;    // if running job
    private int idleTimes = 0;			// idel times

    protected DataSourceTransactionManager dataSourceTransactionManager;
    protected TransactionDefinition transactionDefinition;

    public JobThread(long jobId, JobHandler handler) {
        this.jobId = jobId;
        this.handler = handler;
        this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
        this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());


        this.dataSourceTransactionManager = ExecutorConfiguration.getInstance().getApplicationContext().getBean(DataSourceTransactionManager.class);
        this.transactionDefinition = ExecutorConfiguration.getInstance().getApplicationContext().getBean(TransactionDefinition.class);

        // assign job thread name
        this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());
    }
    public JobHandler getHandler() {
        return handler;
    }

    /**
     * new trigger to queue
     *
     * @param triggerParam
     * @return
     */
    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
        // avoid repeat
        if (triggerLogIdSet.contains(triggerParam.getLogId())) {
            logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
            return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
        }

        triggerLogIdSet.add(triggerParam.getLogId());
        triggerQueue.add(triggerParam);
        return ReturnT.SUCCESS;
    }

    /**
     * kill job thread
     *
     * @param stopReason
     */
    public void toStop(String stopReason) {
        /**
         * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep)，
         * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身；
         * 所以需要注意，此处彻底销毁本线程，需要通过共享变量方式；
         */
        this.toStop = true;
        this.stopReason = stopReason;
    }

    /**
     * is running job
     * @return
     */
    public boolean isRunningOrHasQueue() {
        return running || triggerQueue.size()>0;
    }

    @Override
    public void run() {
        // execute
        while(!toStop){
            running = false;
            idleTimes++;

            TriggerParam triggerParam = null;
            try {
                // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                if (triggerParam!=null) {
                    running = true;
                    idleTimes = 0;
                    triggerLogIdSet.remove(triggerParam.getLogId());

                    // log filename, like "logPath/yyyy-MM-dd/9999.log"
                    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());

                    XxlJobContext xxlJobContext = new XxlJobContext(
                            triggerParam.getJobId(),
                            triggerParam.getJobParams(),
                            logFileName);

                    // init job context
                    XxlJobContext.setXxlJobContext(xxlJobContext);

                    // execute
                    XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

                    JobInfo jobInfo = new JobInfo();
                    jobInfo.setJobId(triggerParam.getJobId());
                    jobInfo.setTenantCode(triggerParam.getTenantCode());
                    jobInfo.setOrgCode(triggerParam.getOrgCode());
                    jobInfo.setExecutorCode(triggerParam.getExecutorCode());
                    jobInfo.setJobName(triggerParam.getJobName());
                    jobInfo.setJobParams(triggerParam.getJobParams());
                    jobInfo.setJobBatch(triggerParam.getJobBatch());

                    if (triggerParam.getJobTimeout() > 0) {
                        // limit timeout
                        Thread futureThread = null;
                        try {
                            FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                                @Override
                                public Boolean call() throws Exception {
                                    // init job context
                                    XxlJobContext.setXxlJobContext(xxlJobContext);

                                    executeJob(jobInfo);

                                    return true;
                                }
                            });
                            futureThread = new Thread(futureTask);
                            futureThread.start();

                            Boolean tempResult = futureTask.get(triggerParam.getJobTimeout(), TimeUnit.SECONDS);
                        } catch (TimeoutException e) {

                            XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                            XxlJobHelper.log(e);

                            // handle result
                            XxlJobHelper.handleTimeout("job execute timeout ");
                        } finally {
                            futureThread.interrupt();
                        }
                    } else {
                        // just execute
                        executeJob(jobInfo);
                    }

                    // valid execute handle data
                    if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                        XxlJobHelper.handleFail("job handle result lost.");
                    } else {
                        String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                        tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                                ?tempHandleMsg.substring(0, 50000).concat("...")
                                :tempHandleMsg;
                        XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                    }
                    XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                            + XxlJobContext.getXxlJobContext().getHandleCode()
                            + ", handleMsg = "
                            + XxlJobContext.getXxlJobContext().getHandleMsg()
                    );

                } else {
                    if (idleTimes > 30) {
                        if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                            XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                        }
                    }
                }
            } catch (Throwable e) {
                if (toStop) {
                    XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                }

                // handle result
                StringWriter stringWriter = new StringWriter();
                e.printStackTrace(new PrintWriter(stringWriter));
                String errorMsg = stringWriter.toString();

                XxlJobHelper.handleFail(errorMsg);

                XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
            } finally {
                if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                                triggerParam.getLogId(),
                                triggerParam.getLogDateTime(),
                                XxlJobContext.getXxlJobContext().getHandleCode(),
                                XxlJobContext.getXxlJobContext().getHandleMsg() )
                        );
                    } else {
                        // is killed
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                                triggerParam.getLogId(),
                                triggerParam.getLogDateTime(),
                                XxlJobContext.HANDLE_CODE_FAIL,
                                stopReason + " [job running, killed]" )
                        );
                    }
                }
            }
        }

        // callback trigger request in queue
        while(triggerQueue !=null && triggerQueue.size()>0){
            TriggerParam triggerParam = triggerQueue.poll();
            if (triggerParam!=null) {
                // is killed
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.HANDLE_CODE_FAIL,
                        stopReason + " [job not executed, in the job queue, killed.]")
                );
            }
        }

        logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
    }

    private void executeJob(JobInfo jobInfo){
        // 申请手工事务回滚点
        TransactionStatus transactionStatus = null;
        long jobId = jobInfo.getJobId();
        try{
            int pageSize = 200;
            if(jobInfo.getJobBatch() != null && jobInfo.getJobBatch().intValue() > 0) {
                pageSize = jobInfo.getJobBatch().intValue();
            }
            List paramList = handler.getParamList(jobInfo);
            if(paramList == null || paramList.size() == 0) {
                return;
            }
            // 多个处理对象

            // 手动开启事务
            transactionStatus = this.dataSourceTransactionManager.getTransaction(this.transactionDefinition);
            for(Object param: paramList){
                Integer totalCount = handler.getTotalCount(jobInfo, param);
                if(totalCount == null || totalCount.intValue() <1 ) {
                    continue;
                }
                XxlJobHelper.log("本JobId: " + jobId + " 共" + totalCount.intValue() + "条数据");
                //logger.info("本Job: {} 共{}条数据", jobId, totalCount.intValue());
                int totalPage = totalCount.intValue() / pageSize;
                if(totalCount.intValue() % pageSize > 0) {
                    totalPage = totalPage + 1;
                }

                // 每一页
                for(int pageNum = 1; pageNum <= totalPage; pageNum ++ ) {
                    XxlJobHelper.log("本JobId: "+jobId+" 共"+totalPage+"页，开始查询第"+pageNum+"页数据");
                    //logger.info("本Job: {} 共{}页，开始查询第{}页数据", jobId, totalPage, pageNum);
                    List pageList = handler.getPageList(jobInfo, param, pageNum, pageSize);
                    handler.execute(jobInfo, param, pageList);
                    XxlJobHelper.log("本JobId: "+jobId+" 共"+totalPage+"页，第"+pageNum+"页数据处理完成");
                    //logger.info("本Job: {} 共{}页，第{}页数据处理完成", jobId, totalPage, pageNum);
                }

                handler.afterExecute(jobInfo, param);
            }
            // 提交事务
            this.dataSourceTransactionManager.commit(transactionStatus);
            XxlJobHelper.log("本Job: "+jobId+" 处理完成");
            //logger.info("本Job: {} 处理完成", jobId);
        } catch (Exception e){
            // 回滚事务
            if(transactionStatus != null){
                this.dataSourceTransactionManager.rollback(transactionStatus);
            }
            e.printStackTrace();
        }
    }
}
